﻿using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using Singer.BackgroundJob.RabbitMQ.BackgroundJobLogs;
using Singer.BackgroundJob.RabbitMQ.Contracts;
using Singer.Core;
using Singer.Middleware.RabbitMQ;
using Singer.Shared.UserContext;
using System.Diagnostics.CodeAnalysis;
using System.Text;
using static Singer.BackgroundJob.RabbitMQ.Contracts.Contants;

namespace Singer.BackgroundJob.RabbitMQ;

/// <summary>
/// 后台任务 - 触发器
/// </summary>
public class BackgroundJobTrigger : IBackgroundJobTrigger
{
    private readonly IRabbitMQConnectionPool _rabbitMQConnectionPool;
    private readonly IUserContextAccessor _userContextAccessor;
    private readonly BackgroundJobLogManager _backgroundJobLogManager;
    private readonly BackgroundJobOptions _options;
    public BackgroundJobTrigger(
        IRabbitMQConnectionPool rabbitMQConnectionPool,
        BackgroundJobLogManager backgroundJobLogManager,
        IUserContextAccessor userContextAccessor,
        IOptions<BackgroundJobOptions> options)
    {
        _rabbitMQConnectionPool = rabbitMQConnectionPool;
        _backgroundJobLogManager = backgroundJobLogManager;
        _userContextAccessor = userContextAccessor;
        _options = options.Value;
    }

    /// <summary>
    /// 发布消息，触发对应的后台任务
    /// </summary>
    /// <typeparam name="TArgs">后台任务消息类型</typeparam>
    /// <param name="args">后台任务消息</param>
    public async Task<string> PublishAsync<TArgs>([NotNull] TArgs args) where TArgs : IBackgroundJobArgs
    {
        var jobLog = await _backgroundJobLogManager.CreateAsync(args); // 创建后台任务日志
        PublishJobMessageToMQ(jobLog.Id, jobLog.RoutingKey); // 推送到MQ 触发任务执行
        return jobLog.Id;
    }

    /// <summary>
    /// 手动取消任务
    /// </summary>
    /// <param name="jobLogId">后台任务日志id</param>
    public async Task CancellingAsync(string jobLogId)
    {
        // 推送一条消息，有个额外的消费者，处理取消逻辑：通过consumerContext 中 任务取消令牌去取消任务。
        // (不在这里直接用consumerContext，因为后台任务程序集可能会单独部署)
        var jobLog = await _backgroundJobLogManager.GetAsync(jobLogId);
        await _backgroundJobLogManager.CancellingAsync(jobLog);
        IConnection connection = _rabbitMQConnectionPool.Get(_options.ConnectionName);
        using IModel channel = connection.CreateModel();
        var message = new BackgroundJobCancelTaskMessage(jobLogId);
        channel.BasicPublish(_options.ExchangeName, DEFAULT_ROUTINGKEY_CANCEL_TASK, null, Encoding.UTF8.GetBytes(JsonUtils.ToJson(message)));
    }

    /// <summary>
    /// 手动重启任务
    /// </summary>
    /// <param name="jobLogId">后台任务日志id</param>
    public async Task RestartingAsync(string jobLogId)
    {
        var jobLog = await _backgroundJobLogManager.GetAsync(jobLogId);
        if (jobLog == null)
            throw new Exception("任务不存在");
        await _backgroundJobLogManager.ReStartingAsync(jobLog);
        PublishJobMessageToMQ(jobLogId, jobLog.RoutingKey);
    }

    #region Common

    /// <summary>
    /// 将任务消息推送到消息队列中
    /// </summary>
    /// <param name="jobLogId">后台任务日志id</param>
    /// <param name="routingKey">消息路由key</param>
    private void PublishJobMessageToMQ(string jobLogId, string routingKey)
    {
        var userContext = _userContextAccessor?.UserContext;
        var message = new BackgroundJobMessage(jobLogId, userContext?.TenantId, userContext?.UserId);

        IConnection connection = _rabbitMQConnectionPool.Get(_options.ConnectionName);
        using IModel channel = connection.CreateModel();
        var properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 2; // 指定消息持久化。即服务重启之后依然保留。
        channel.BasicPublish(_options.ExchangeName, routingKey, properties, Encoding.UTF8.GetBytes(JsonUtils.ToJson(message)));
    }

    #endregion
}
